package com.fzu.geometa.search.consumer;

import cn.hutool.core.date.DateUtil;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fzu.geometa.mq.constant.Topic;
import com.fzu.geometa.mq.model.Event;
import com.fzu.geometa.search.model.Metadata;
import com.fzu.geometa.search.service.ElasticsearchService;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;

@Component
@Slf4j
public class MetadataEventConsumer {
    @Resource
    ElasticsearchService elasticsearchService;
    ObjectMapper objectMapper = new ObjectMapper();

    @KafkaListener(topics = {Topic.PUBLISH})
    public void handleSaveMessage(ConsumerRecord record) {
        Event event = parseEvent(record);
        if (event == null) {
            return;
        }
        // 创建元数据
        Metadata metadata = new Metadata();
        metadata.setId(event.getEntityId());
        metadata.setCoverageId(event.getData("coverageId").toString());
        metadata.setSubtype(event.getData("subtype").toString());
        metadata.setDescription(event.getData("description").toString());
        metadata.setCreateTime(DateUtil.parse(event.getData("createTime").toString()));
        metadata.setContent(event.getData("content").toString());

        elasticsearchService.save(metadata);
    }

    @KafkaListener(topics = {Topic.RETRACT})
    public void handleDeleteMessage(ConsumerRecord record) {
        Event event = parseEvent(record);
        if (event == null) {
            return;
        }
        Long cid = event.getEntityId();
        elasticsearchService.deleteById(cid);
    }


    private Event parseEvent(ConsumerRecord record) {
        if (record == null || record.value() == null) {
            log.error("消息内容为空");
            return null;
        }
        String eventJson = record.value().toString();
        Event event = null;
        try {
            event = objectMapper.readValue(eventJson, Event.class);
        } catch (JsonProcessingException e) {
            log.error("消息格式错误");
            throw new RuntimeException(e);
        }
        return event;
    }
}
